/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.StealJobQueue;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * Compact region on request and then run split if appropriate
 */
@InterfaceAudience.Private
public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
    private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);

    // Configuration key for the large compaction threads.
    public final static String LARGE_COMPACTION_THREADS = "hbase.regionserver.thread.compaction.large";
    public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;

    // Configuration key for the small compaction threads.
    public final static String SMALL_COMPACTION_THREADS = "hbase.regionserver.thread.compaction.small";
    public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;

    // Configuration key for split threads
    public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
    public final static int SPLIT_THREADS_DEFAULT = 1;

    public static final String REGION_SERVER_REGION_SPLIT_LIMIT = "hbase.regionserver.regionSplitLimit";
    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000;
    public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = "hbase.regionserver.compaction.enabled";

    private final HRegionServer server;
    private final Configuration conf;
    private volatile ThreadPoolExecutor longCompactions;
    private volatile ThreadPoolExecutor shortCompactions;
    private volatile ThreadPoolExecutor splits;

    private volatile ThroughputController compactionThroughputController;

    private volatile boolean compactionsEnabled;
    /**
     * Splitting should not take place if the total number of regions exceed this.
     * This is not a hard limit to the number of regions but it is a guideline to
     * stop splitting after number of online regions is greater than this.
     */
    private int regionSplitLimit;

    /**
     * @param server
     */
    CompactSplit(HRegionServer server) {
        this.server = server;
        this.conf = server.getConfiguration();
        this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
        createCompactionExecutors();
        createSplitExcecutors();

        // compaction throughput controller
        this.compactionThroughputController = CompactionThroughputControllerFactory.create(server, conf);
    }

    private void createSplitExcecutors() {
        final String n = Thread.currentThread().getName();
        int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
        this.splits = (ThreadPoolExecutor) Executors
                .newFixedThreadPool(splitThreads, new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
    }

    private void createCompactionExecutors() {
        this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);

        int largeThreads = Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
        int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);

        // if we have throttle threads, make sure the user also specified size
        Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);

        final String n = Thread.currentThread().getName();

        StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
        this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, stealJobQueue,
                new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
        this.longCompactions.setRejectedExecutionHandler(new Rejection());
        this.longCompactions.prestartAllCoreThreads();
        this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
                new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
        this.shortCompactions.setRejectedExecutionHandler(new Rejection());
    }

    @Override
    public String toString() {
        return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size() + ":shortCompactions=" + shortCompactions.getQueue()
                .size() + ")" + ", splitQueue=" + splits.getQueue().size();
    }

    public String dumpQueue() {
        StringBuilder queueLists = new StringBuilder();
        queueLists.append("Compaction/Split Queue dump:\n");
        queueLists.append("  LargeCompation Queue:\n");
        BlockingQueue<Runnable> lq = longCompactions.getQueue();
        Iterator<Runnable> it = lq.iterator();
        while(it.hasNext()) {
            queueLists.append("    " + it.next().toString());
            queueLists.append("\n");
        }

        if(shortCompactions != null) {
            queueLists.append("\n");
            queueLists.append("  SmallCompation Queue:\n");
            lq = shortCompactions.getQueue();
            it = lq.iterator();
            while(it.hasNext()) {
                queueLists.append("    " + it.next().toString());
                queueLists.append("\n");
            }
        }

        queueLists.append("\n");
        queueLists.append("  Split Queue:\n");
        lq = splits.getQueue();
        it = lq.iterator();
        while(it.hasNext()) {
            queueLists.append("    " + it.next().toString());
            queueLists.append("\n");
        }

        return queueLists.toString();
    }

    public synchronized boolean requestSplit(final Region r) {
        // don't split regions that are blocking
        if(shouldSplitRegion() && ((HRegion) r).getCompactPriority() >= PRIORITY_USER) {
            byte[] midKey = ((HRegion) r).checkSplit();
            if(midKey != null) {

                /********
                 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                 *   注释： split 的 入口方法
                 */
                requestSplit(r, midKey);

                return true;
            }
        }
        return false;
    }

    public synchronized void requestSplit(final Region r, byte[] midKey) {

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释：
         */
        requestSplit(r, midKey, null);
    }

    /*
     * The User parameter allows the split thread to assume the correct user identity
     */
    public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
        if(midKey == null) {
            LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + " not splittable because midkey=null");
            if(((HRegion) r).shouldForceSplit()) {
                ((HRegion) r).clearSplit();
            }
            return;
        }
        try {

            /********
             * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
             *   注释：
             */
            this.splits.execute(new SplitRequest(r, midKey, this.server, user));

            if(LOG.isDebugEnabled()) {
                LOG.debug("Splitting " + r + ", " + this);
            }
        } catch(RejectedExecutionException ree) {
            LOG.info("Could not execute split for " + r, ree);
        }
    }

    private void interrupt() {
        longCompactions.shutdownNow();
        shortCompactions.shutdownNow();
    }

    private void reInitializeCompactionsExecutors() {
        createCompactionExecutors();
    }

    private interface CompactionCompleteTracker {

        default void completed(Store store) {
        }
    }

    private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = new CompactionCompleteTracker() {
    };

    private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {

        private final CompactionLifeCycleTracker tracker;

        private final AtomicInteger remaining;

        public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
            this.tracker = tracker;
            this.remaining = new AtomicInteger(numberOfStores);
        }

        @Override
        public void completed(Store store) {
            if(remaining.decrementAndGet() == 0) {
                tracker.completed();
            }
        }
    }

    private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, IntSupplier numberOfStores) {
        if(tracker == CompactionLifeCycleTracker.DUMMY) {
            // a simple optimization to avoid creating unnecessary objects as usually we do not care about
            // the life cycle of a compaction.
            return DUMMY_COMPLETE_TRACKER;
        } else {
            return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
        }
    }

    @Override
    public synchronized void requestCompaction(HRegion region, String why, int priority, CompactionLifeCycleTracker tracker,
            User user) throws IOException {
        requestCompactionInternal(region, why, priority, true, tracker,
                getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
    }

    @Override
    public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, CompactionLifeCycleTracker tracker,
            User user) throws IOException {
        requestCompactionInternal(region, store, why, priority, true, tracker, getCompleteTracker(tracker, () -> 1), user);
    }

    @Override
    public void switchCompaction(boolean onOrOff) {
        if(onOrOff) {
            // re-create executor pool if compactions are disabled.
            if(!isCompactionsEnabled()) {
                LOG.info("Re-Initializing compactions because user switched on compactions");
                reInitializeCompactionsExecutors();
            }
        } else {
            LOG.info("Interrupting running compactions because user switched off compactions");
            interrupt();
        }
        setCompactionsEnabled(onOrOff);
    }

    private void requestCompactionInternal(HRegion region, String why, int priority, boolean selectNow, CompactionLifeCycleTracker tracker,
            CompactionCompleteTracker completeTracker, User user) throws IOException {

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释：
         */
        // request compaction on all stores
        for(HStore store : region.stores.values()) {
            requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, user);
        }
    }

    private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, boolean selectNow,
            CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) throws IOException {

        // TODO_MA 注释：条件
        if(this.server.isStopped() || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) {
            return;
        }
        RegionServerSpaceQuotaManager spaceQuotaManager = this.server.getRegionServerSpaceQuotaManager();

        if(user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null && spaceQuotaManager
                .areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
            // Enter here only when:
            // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
            String reason = "Ignoring compaction request for " + region + " as an active space quota violation " + " policy disallows compactions.";
            tracker.notExecuted(store, reason);
            completeTracker.completed(store);
            LOG.debug(reason);
            return;
        }

        CompactionContext compaction;
        if(selectNow) {

            /********
             * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
             *   注释：
             */
            Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user);
            if(!c.isPresent()) {
                // message logged inside
                return;
            }
            compaction = c.get();
        } else {
            compaction = null;
        }

        ThreadPoolExecutor pool;
        if(selectNow) {

            /********
             * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
             *   注释：
             */
            // compaction.get is safe as we will just return if selectNow is true but no compaction is selected
            pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
        } else {
            // We assume that most compactions are small. So, put system compactions into small
            // pool; we will do selection there, and move to large pool if necessary.
            pool = shortCompactions;
        }

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释：
         */
        pool.execute(new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));

        region.incrementCompactionsQueuedCount();
        if(LOG.isDebugEnabled()) {
            String type = (pool == shortCompactions) ? "Small " : "Large ";
            LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") + (why != null && !why
                    .isEmpty() ? "; Because: " + why : "") + "; " + this);
        }
    }

    public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释：
         */
        requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
    }

    public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException {
        requestCompactionInternal(region, store, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
    }

    private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, CompactionLifeCycleTracker tracker,
            CompactionCompleteTracker completeTracker, User user) throws IOException {
        // don't even select for compaction if disableCompactions is set to true
        if(!isCompactionsEnabled()) {
            LOG.info(String.format("User has disabled compactions"));
            return Optional.empty();
        }

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释：
         */
        Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);

        if(!compaction.isPresent() && region.getRegionInfo() != null) {
            String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + " because compaction request was cancelled";
            tracker.notExecuted(store, reason);
            completeTracker.completed(store);
            LOG.debug(reason);
        }
        return compaction;
    }

    /**
     * Only interrupt once it's done with a run through the work loop.
     */
    void interruptIfNecessary() {
        splits.shutdown();
        longCompactions.shutdown();
        shortCompactions.shutdown();
    }

    private void waitFor(ThreadPoolExecutor t, String name) {
        boolean done = false;
        while(!done) {
            try {
                done = t.awaitTermination(60, TimeUnit.SECONDS);
                LOG.info("Waiting for " + name + " to finish...");
                if(!done) {
                    t.shutdownNow();
                }
            } catch(InterruptedException ie) {
                LOG.warn("Interrupted waiting for " + name + " to finish...");
                t.shutdownNow();
            }
        }
    }

    void join() {
        waitFor(splits, "Split Thread");
        waitFor(longCompactions, "Large Compaction Thread");
        waitFor(shortCompactions, "Small Compaction Thread");
    }

    /**
     * Returns the current size of the queue containing regions that are
     * processed.
     *
     * @return The current size of the regions queue.
     */
    public int getCompactionQueueSize() {
        return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
    }

    public int getLargeCompactionQueueSize() {
        return longCompactions.getQueue().size();
    }


    public int getSmallCompactionQueueSize() {
        return shortCompactions.getQueue().size();
    }

    public int getSplitQueueSize() {
        return splits.getQueue().size();
    }

    private boolean shouldSplitRegion() {
        if(server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) {
            LOG.warn(
                    "Total number of regions is approaching the upper limit " + regionSplitLimit + ". " + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
        }
        return (regionSplitLimit > server.getNumberOfOnlineRegions());
    }

    /**
     * @return the regionSplitLimit
     */
    public int getRegionSplitLimit() {
        return this.regionSplitLimit;
    }

    private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() {

        private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
            if(r1 == r2) {
                return 0; //they are the same request
            }
            // less first
            int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
            if(cmp != 0) {
                return cmp;
            }
            cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
            if(cmp != 0) {
                return cmp;
            }

            // break the tie based on hash code
            return System.identityHashCode(r1) - System.identityHashCode(r2);
        }

        @Override
        public int compare(Runnable r1, Runnable r2) {
            // CompactionRunner first
            if(r1 instanceof CompactionRunner) {
                if(!(r2 instanceof CompactionRunner)) {
                    return -1;
                }
            } else {
                if(r2 instanceof CompactionRunner) {
                    return 1;
                } else {
                    // break the tie based on hash code
                    return System.identityHashCode(r1) - System.identityHashCode(r2);
                }
            }
            CompactionRunner o1 = (CompactionRunner) r1;
            CompactionRunner o2 = (CompactionRunner) r2;
            // less first
            int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
            if(cmp != 0) {
                return cmp;
            }
            CompactionContext c1 = o1.compaction;
            CompactionContext c2 = o2.compaction;
            if(c1 != null) {
                return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
            } else {
                return c2 != null ? 1 : 0;
            }
        }
    };

    private final class CompactionRunner implements Runnable {
        private final HStore store;
        private final HRegion region;
        private final CompactionContext compaction;
        private final CompactionLifeCycleTracker tracker;
        private final CompactionCompleteTracker completeTracker;
        private int queuedPriority;
        private ThreadPoolExecutor parent;
        private User user;
        private long time;

        public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, CompactionLifeCycleTracker tracker,
                CompactionCompleteTracker completeTracker, ThreadPoolExecutor parent, User user) {
            this.store = store;
            this.region = region;
            this.compaction = compaction;
            this.tracker = tracker;
            this.completeTracker = completeTracker;
            this.queuedPriority = compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
            this.parent = parent;
            this.user = user;
            this.time = EnvironmentEdgeManager.currentTime();
        }

        @Override
        public String toString() {
            if(compaction != null) {
                return "Request=" + compaction.getRequest();
            } else {
                return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority=" + queuedPriority + ", startTime=" + time;
            }
        }

        private void doCompaction(User user) {
            CompactionContext c;
            // Common case - system compaction without a file selection. Select now.
            if(compaction == null) {
                int oldPriority = this.queuedPriority;
                this.queuedPriority = this.store.getCompactPriority();
                if(this.queuedPriority > oldPriority) {
                    // Store priority decreased while we were in queue (due to some other compaction?),
                    // requeue with new priority to avoid blocking potential higher priorities.
                    this.parent.execute(this);
                    return;
                }
                Optional<CompactionContext> selected;
                try {

                    /********
                     * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                     *   注释：
                     */
                    selected = selectCompaction(this.region, this.store, queuedPriority, tracker, completeTracker, user);
                } catch(IOException ex) {
                    LOG.error("Compaction selection failed " + this, ex);
                    server.checkFileSystem();
                    region.decrementCompactionsQueuedCount();
                    return;
                }
                if(!selected.isPresent()) {
                    region.decrementCompactionsQueuedCount();
                    return; // nothing to do
                }
                c = selected.get();
                assert c.hasSelection();

                // Now see if we are in correct pool for the size; if not, go to the correct one.
                // We might end up waiting for a while, so cancel the selection.
                ThreadPoolExecutor pool = store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;

                // Long compaction pool can process small job
                // Short compaction pool should not process large job
                if(this.parent == shortCompactions && pool == longCompactions) {
                    this.store.cancelRequestedCompaction(c);
                    this.parent = pool;
                    this.parent.execute(this);
                    return;
                }
            } else {
                c = compaction;
            }
            // Finally we can compact something.
            assert c != null;

            tracker.beforeExecution(store);
            try {
                // Note: please don't put single-compaction logic here;
                //       put it into region/store/etc. This is CST logic.
                long start = EnvironmentEdgeManager.currentTime();

                /********
                 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                 *   注释： 这是最核心的 关于  comapct 的实现
                 */
                boolean completed = region.compact(c, store, compactionThroughputController, user);

                long now = EnvironmentEdgeManager.currentTime();
                LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration=" + StringUtils.formatTimeDiff(now, start));
                if(completed) {
                    // degenerate case: blocked regions require recursive enqueues
                    if(store.getCompactPriority() <= 0) {
                        requestSystemCompaction(region, store, "Recursive enqueue");
                    } else {

                        /********
                         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                         *   注释：
                         */
                        // see if the compaction has caused us to exceed max region size
                        requestSplit(region);
                    }
                }
            } catch(IOException ex) {
                IOException remoteEx = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
                LOG.error("Compaction failed " + this, remoteEx);
                if(remoteEx != ex) {
                    LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
                }
                region.reportCompactionRequestFailure();
                server.checkFileSystem();
            } catch(Exception ex) {
                LOG.error("Compaction failed " + this, ex);
                region.reportCompactionRequestFailure();
                server.checkFileSystem();
            } finally {
                tracker.afterExecution(store);
                completeTracker.completed(store);
                region.decrementCompactionsQueuedCount();
                LOG.debug("Status {}", CompactSplit.this);
            }
        }

        @Override
        public void run() {
            Preconditions.checkNotNull(server);
            if(server.isStopped() || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) {
                region.decrementCompactionsQueuedCount();
                return;
            }

            /********
             * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
             *   注释：
             */
            doCompaction(user);
        }

        private String formatStackTrace(Exception ex) {
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            ex.printStackTrace(pw);
            pw.flush();
            return sw.toString();
        }
    }

    /**
     * Cleanup class to use when rejecting a compaction request from the queue.
     */
    private static class Rejection implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
            if(runnable instanceof CompactionRunner) {
                CompactionRunner runner = (CompactionRunner) runnable;
                LOG.debug("Compaction Rejected: " + runner);
                if(runner.compaction != null) {
                    runner.store.cancelRequestedCompaction(runner.compaction);
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void onConfigurationChange(Configuration newConf) {
        // Check if number of large / small compaction threads has changed, and then
        // adjust the core pool size of the thread pools, by using the
        // setCorePoolSize() method. According to the javadocs, it is safe to
        // change the core pool size on-the-fly. We need to reset the maximum
        // pool size, as well.
        int largeThreads = Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
        if(this.longCompactions.getCorePoolSize() != largeThreads) {
            LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from " + this.longCompactions.getCorePoolSize() + " to " + largeThreads);
            if(this.longCompactions.getCorePoolSize() < largeThreads) {
                this.longCompactions.setMaximumPoolSize(largeThreads);
                this.longCompactions.setCorePoolSize(largeThreads);
            } else {
                this.longCompactions.setCorePoolSize(largeThreads);
                this.longCompactions.setMaximumPoolSize(largeThreads);
            }
        }

        int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
        if(this.shortCompactions.getCorePoolSize() != smallThreads) {
            LOG.info(
                    "Changing the value of " + SMALL_COMPACTION_THREADS + " from " + this.shortCompactions.getCorePoolSize() + " to " + smallThreads);
            if(this.shortCompactions.getCorePoolSize() < smallThreads) {
                this.shortCompactions.setMaximumPoolSize(smallThreads);
                this.shortCompactions.setCorePoolSize(smallThreads);
            } else {
                this.shortCompactions.setCorePoolSize(smallThreads);
                this.shortCompactions.setMaximumPoolSize(smallThreads);
            }
        }

        int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
        if(this.splits.getCorePoolSize() != splitThreads) {
            LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize() + " to " + splitThreads);
            if(this.splits.getCorePoolSize() < splitThreads) {
                this.splits.setMaximumPoolSize(splitThreads);
                this.splits.setCorePoolSize(splitThreads);
            } else {
                this.splits.setCorePoolSize(splitThreads);
                this.splits.setMaximumPoolSize(splitThreads);
            }
        }

        ThroughputController old = this.compactionThroughputController;
        if(old != null) {
            old.stop("configuration change");
        }
        this.compactionThroughputController = CompactionThroughputControllerFactory.create(server, newConf);

        // We change this atomically here instead of reloading the config in order that upstream
        // would be the only one with the flexibility to reload the config.
        this.conf.reloadConfiguration();
    }

    protected int getSmallCompactionThreadNum() {
        return this.shortCompactions.getCorePoolSize();
    }

    protected int getLargeCompactionThreadNum() {
        return this.longCompactions.getCorePoolSize();
    }

    protected int getSplitThreadNum() {
        return this.splits.getCorePoolSize();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void registerChildren(ConfigurationManager manager) {
        // No children to register.
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void deregisterChildren(ConfigurationManager manager) {
        // No children to register
    }

    @VisibleForTesting
    public ThroughputController getCompactionThroughputController() {
        return compactionThroughputController;
    }

    @VisibleForTesting
    /**
     * Shutdown the long compaction thread pool.
     * Should only be used in unit test to prevent long compaction thread pool from stealing job
     * from short compaction queue
     */
    void shutdownLongCompactions() {
        this.longCompactions.shutdown();
    }

    public void clearLongCompactionsQueue() {
        longCompactions.getQueue().clear();
    }

    public void clearShortCompactionsQueue() {
        shortCompactions.getQueue().clear();
    }

    public boolean isCompactionsEnabled() {
        return compactionsEnabled;
    }

    public void setCompactionsEnabled(boolean compactionsEnabled) {
        this.compactionsEnabled = compactionsEnabled;
        this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION, String.valueOf(compactionsEnabled));
    }

    /**
     * @return the longCompactions thread pool executor
     */
    ThreadPoolExecutor getLongCompactions() {
        return longCompactions;
    }

    /**
     * @return the shortCompactions thread pool executor
     */
    ThreadPoolExecutor getShortCompactions() {
        return shortCompactions;
    }

}
